"""This module deals with flags on the DFS. It can add, mark as fresh, delete,
mark as complete 

DFS:
    \jobs
        \jobs.flag
        \001
        \002
        \[JobId]
            \map.flag
            \reduce.flag
            \job.xml
            \inputfile0
            \inputfile1
            \inputfile2

"""

# TODO: Need to make function which tells if there are any maps or reduces left
#  Will check if all stubs are either done or have failed


import os
import os.path
import time
import operator
import ma.log
import ma.net.distlock
import ma.const
import ma.fs.dfs._hdfs
import ma.utils.listutils as listutils
import ma.commons.core.constants as constants


class DfsFlags(object):
    """DfsFlags class used to deal with stubs on the the DFS.
    
    It can be used to create, delete, update jobs and tasks alike.
    """
    
    __mapflag_template = '<?xml version="1.0" encoding="utf-8"?>\n<!DOCTYPE map_flags SYSTEM "mapflags.dtd">\n<map_flags>\n%s<inptd_mts></inptd_mts>\n</map_flags>\n'
    
    __mt_tag = '<mt>'
    __end_mt_tag = '</mt>'
    __end_mt_len = len(__end_mt_tag)
    __rt_tag = '<rt>'
    __end_rt_tag = '</rt>'
    __end_rt_len = len(__end_rt_tag)
    __id_tag = '<id>'
    __id_len = len(__id_tag)
    __end_id_tag = '</id>'
    __pu_tag = '<pu>'
    __pu_len = len(__pu_tag)
    __end_pu_tag = '</pu>'
    __dn_tag = '<dn>'
    __dn_len = len(__dn_tag)
    __end_dn_tag = '</dn>'
    __fl_tag = '<fl>'
    __fl_len = len(__fl_tag)
    __end_fl_tag = '</fl>'
    __strct_tag = '<strc>'
    __strct_len = len(__strct_tag)
    __end_strct_tag = '</strc>'
    __inp_tag = '<inp>'
    __inp_len = len(__inp_tag)
    __end_inp_tag = '</inp>'
    __in_tag = '<in>'
    __in_len = len(__in_tag)
    __end_in_tag = '</in>'
    __fn_tag = '<fn>'
    __fn_len = len(__fn_tag)
    __end_fn_tag = '</fn>'
    __offs_tag = '<offs>'
    __offs_len = len(__offs_tag)
    __end_offs_tag = '</offs>'
    __sz_tag = '<sz>'
    __sz_len = len(__sz_tag)
    __end_sz_tag = '</sz>'
    __ts_tag = '<ts>'
    __ts_len = len(__ts_tag)
    __end_ts_tag = '</ts>'
    __inpt_mts_tag = '<inptd_mts>'
    __inpt_mts_len = len(__inpt_mts_tag)
    __end_inpt_mts_tag = '</inptd_mts>'
    __inpt_rts_tag = '<inptd_rts>'
    __inpt_rts_len = len(__inpt_rts_tag)
    __end_inpt_rts_tag = '</inptd_rts>'
    __lvl_tag = '<lvl>'
    __lvl_len = len(__lvl_tag)
    __end_lvl_tag = '</lvl>'
    __end_tag = '</'
    __breaker = '\n'
    
    __map_flag_stub = __mt_tag  + __breaker \
                    + __id_tag + '%s' + __end_id_tag + __pu_tag + '%d' + __end_pu_tag \
                    + __dn_tag + '%d' + __end_dn_tag + __fl_tag + '%d' + __end_fl_tag \
                    + __strct_tag + '%s' + __end_strct_tag + __breaker \
                    + __inp_tag + __breaker + '%s' + __end_inp_tag + __breaker \
                    + __ts_tag + '%f' + __end_ts_tag + __breaker \
                    + __end_mt_tag
                    
    __map_flag_input = __in_tag + __fn_tag + '%s' + __end_fn_tag + __offs_tag + '%d' + __end_offs_tag \
                    + __sz_tag + '%d' + __end_sz_tag + __end_in_tag + __breaker
    
    __reduce_flag_stub = __rt_tag  + __breaker \
                    + __id_tag + '%s' + __end_id_tag + __pu_tag + '%d' + __end_pu_tag \
                    + __dn_tag + '%d' + __end_dn_tag + __lvl_tag + '%d' + __end_lvl_tag \
                    + __fl_tag + '%d' + __end_fl_tag + __strct_tag + '%s' + __end_strct_tag + __breaker \
                    + __inp_tag + '%s' + __end_inp_tag + __breaker \
                    + __ts_tag + '%f' + __end_ts_tag + __breaker \
                    + __end_rt_tag
                    
    
    # dictionary keys
    __dfs_job_flags_path = 'DFS_JOB_FLAGS_PATH'
    __dfs_map_flags_path = 'DFS_MAP_FLAGS_PATH'
    __dfs_reduce_flags_path = 'DFS_REDUCE_FLAGS_PATH'
    __local_job_flags_path = 'LOCAL_JOB_FLAGS_PATH'
    __local_map_flags_path = 'LOCAL_MAP_FLAGS_PATH'
    __local_reduce_flags_path = 'LOCAL_REDUCE_FLAGS_PATH'
    __map_timeout = 'MAP_TIMEOUT'
    __reduce_timeout = 'REDUCE_TIMEOUT'
    __max_map_attempts = 'MAX_ATTEMPTS_AT_A_MAP_TASK'
    __max_reduce_attempts = 'MAX_ATTEMPTS_AT_A_REDUCE_TASK'
    __max_map_to_red_inputs = 'MAX_MAP_TO_REDUCE_INPUTS'
    __max_red_to_red_inputs = 'MAX_REDUCE_TO_REDUCE_INPUTS'

    # parameters
    CHOOSE_ALL_MAPS = 0
    
    # default values
    __default_pu = 0
    __default_dn = 0
    __default_fl = 0
    __default_ts = 0
    __default_inptd_mts = ''
    
    __first_map_id = 0
    __first_reduce_id = 0

    __job_list = []
    
    def __init__(self, hdfs_ref):
        #initialize __log for DfsFlags
        self.__log = ma.log.get_logger("ma.fs")
        
        try:
            # initialize usage variables and objs for DfsFlags
            self.__dist_lock = ma.net.distlock.DistributedLock()
            self.__hdfs_ref = hdfs_ref
            
            # const cache variable ... stores against job ids
            self.__xml_consts_cache = {}
            
            self.__log.info('Initiated DfsFlags')            
        except Exception as err_msg:
            self.__log.error("Error while creating Dfs flags object: %s", err_msg)
        
    
    def create_job(self, local_xml_path):
        """Create a job (used by JobClient). Specify the path of the XML. 
        Returns JobId
            a. Create the job directory
            b. Upload the job.xml in the job directory
            c. Create all map flags in map.flag according to job.xml
            d. Makes an entry to jobs.flag
        """
        # NI
        job_id = 1
        return job_id
    
    
    def force_job_id(self, job_id):
        """This function enforces a job id onto the HDFS 
        """
        self.__log.warning('Forcing job id on DFS: %d', job_id)
        self.__job_list.append(job_id)
        
    
    def get_active_job_list(self):
        """Get list of active jobs. Returns a list of job_id integers
            a. Gets the job.flag file
            b. parses it and gets all the active jobs whose <halt> is marked 
            as 0
        """
        # NI
        return self.__job_list
    
    
    def get_available_map_tasks(self, job_id):
        """Get a list of undone maps. Returns a list of map_ids or None on 
        failure
            a. Get the map.flag file for the specific job
            b. parse it and get all fresh maps, and expired but within no. of 
            failed times threshold
        """
        try:
            # cache xml if not already done
            if job_id not in self.__xml_consts_cache:
                self.__cache_consts(job_id)
            
            # initialize dict
            return_map_task_list = []
            
            # don't move forward till map file blocked
            if not self.__dist_lock.block_till_locked(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]):
                self.__log.error('Unable to read the map flags (job_id %d) continuously locked', job_id)
                return None
            
            # Delete old map flag if still present
            if os.path.isfile(self.__xml_consts_cache[job_id][self.__local_map_flags_path]):
                os.remove(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            
            # copy map flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path], local_dir_path, job_id):
                self.__log.error('Unable to copy map flags (job_id %d) from DFS', job_id)
                return False
            
            # read file into string
            mf_fd = open(self.__xml_consts_cache[job_id][self.__local_map_flags_path], 'r')
            mf_str = mf_fd.read()
            mf_fd.close()
            
            idx, flag = self.__get_next_map_flag(mf_str)
            while flag != None:
                # only consider task whose failure rate is within limits
                if self.__task_available(flag[1], flag[2], flag[3], \
                                         self.__xml_consts_cache[job_id][self.__max_map_attempts], \
                                         flag[6], self.__xml_consts_cache[job_id][self.__map_timeout]):
                    return_map_task_list.append(flag[0])
                
                # try to get next map task
                idx, flag = self.__get_next_map_flag(mf_str, idx)
            
            self.__log.debug('Returning the Undone map task list: %s', str(return_map_task_list))
                   
            # return list of undone map tasks
            return return_map_task_list
        
        except Exception as err_msg:
            self.__log.error("Error while getting list of undone map tasks: %s", err_msg)
            return None
    
    
    def choose_map_tasks(self, max_no_maps, job_id):
        """Lock/Choose a map task. Specify job_id and number of maps that need to
        be chosen. Returns list of tuples (map_id, struct_id, [inputs (filename,
        offset, size)]), empty list of none could be locked, and None on failure
            a. lock the map flags for the job of chosen maps
            b. iterate through maps and see which are available
                if it is change <pu> to 1 and set timestamp
            c. Keep iterating through maps available until u have found either the
                number of maps required or no more maps are available
            d. write the new map file to the flag file
            e. copy back to DFS
            f. unlock the map file
        """
        
        # incase we dont need to pick any maps
        if max_no_maps < 0:
            return None, None
        
        # cache xml if not already done
        if job_id not in self.__xml_consts_cache:
            self.__cache_consts(job_id)
            
        dfs_map_filepath = self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]
        
        # lock map flag file
        try:
            # initialize dict
            return_map_task_list = []
            
            # attempt to lock the map flags file
            if not self.__dist_lock.lock_and_hold(dfs_map_filepath):
                self.__log.warn('Unable to lock file for choosing a map: %s', dfs_map_filepath)
                return None, None
            
            self.__log.debug('Locked file for choosing map: %s', dfs_map_filepath)
            
            # Delete old map flag if still present
            local_map_filepath = self.__xml_consts_cache[job_id][self.__local_map_flags_path]
            if os.path.isfile(local_map_filepath):
                os.remove(local_map_filepath)
            
            # copy map flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(dfs_map_filepath, local_dir_path, job_id):
                self.__log.error('Unable to copy map flags (job_id %d) from DFS', job_id)
                return None, None
            
            mf_fd = open(local_map_filepath, 'r')
            mf_str = mf_fd.read()
            mf_fd.close()
            
            idx2 = 0
            
            done_maps, undone_maps, failed_maps = self.__get_map_tasks_statuses(mf_str, self.__xml_consts_cache[job_id][self.__max_map_attempts])
            
            idx, flag = self.__get_next_map_flag(mf_str)
            
            # keep iterating over the file until you find enough maps or reach end of file
            while idx != -1 and (max_no_maps == self.CHOOSE_ALL_MAPS or len(return_map_task_list) < max_no_maps):
                #print "-------------- ", job_id, self.__xml_consts_cache[job_id][self.__map_timeout]    
                # only consider task whose failure rate is within limits
                if self.__task_available(flag[1], flag[2], flag[3], \
                                         self.__xml_consts_cache[job_id][self.__max_map_attempts], \
                                         flag[6], self.__xml_consts_cache[job_id][self.__map_timeout]):
                    
                    # mark as picked up
                    idx2 = mf_str.find(self.__pu_tag, idx2)
                    idx2 += self.__pu_len
                    mf_str = mf_str[:idx2] + '1' + mf_str[idx2+1:]
                    
                    # TODO: if timer had expired when being chosen, increase failed
                    
                    idx2 = mf_str.find(self.__ts_tag, idx2)
                    idx2 += self.__ts_len
                    idx3 = mf_str.find(self.__end_tag, idx2)
                    tx = str(time.time())
                    mf_str = mf_str[:idx2] + tx + mf_str[idx3:]
                    
                    # append tuple of map id, structural info and input
                    return_map_task_list.append((flag[0], flag[4], flag[5]))
                    
                    # adjust idx according to length differences
                    idx += len(tx) - (idx3 - idx2)
                
                idx2 = idx
                
                # try to get next map task
                idx, flag = self.__get_next_map_flag(mf_str, idx)
                
            
            # if some maps got chosen, write back the map file
            if len(return_map_task_list) > 0:
                # write updated map flag file
                mf_fd = open(local_map_filepath, 'w+')
                mf_fd.write(mf_str)
                mf_fd.close()
            
                self.__log.info('Writing chosen maps to the map flags')
                
                # delete the old flags file
                if not self.__hdfs_ref.delete_file(dfs_map_filepath):
                    self.__log.error('Unable to delete the map flags before replacement: %s', dfs_map_filepath)
                    return None, None
                
                dfs_dir_path = os.path.dirname(dfs_map_filepath)
                
                if not self.__hdfs_ref.copy_file_from_local(local_map_filepath, dfs_dir_path, job_id):
                    self.__log.error('Unable to copy map flags back to DFS: %s', dfs_map_filepath)
                    return None, None
                
            # delete the map flags file from the local directory
            os.remove(local_map_filepath)
            
            self.__log.debug("Returning chosen maps: %s", str(return_map_task_list))
            return return_map_task_list, len(undone_maps) == 0
        
        except Exception as err_msg:
            self.__log.error("Error while choosing %d maps of job %d: %s", max_no_maps, job_id, err_msg)
            return None, None
        
        finally:
            # release the lock
            self.__dist_lock.unlock(dfs_map_filepath)
            self.__log.debug("Released lock: %s", dfs_map_filepath)
    
        
    def mark_maps_complete(self, list_of_completed_maps, job_id):
        """Mark map tasks as complete. Specify job_id and the list of map
        tasks. Return true if no other maps are left, false if some more
        maps are left, and None on failure. The list of map tasks should
        be tuple (map_id, struct_id)
            a. lock and fetch the map.flag for the particular job
            b. It will mark <dn> to 1 in the flag for the particular map.
                If already marked as done, ignore that map
            c. It will check if a certain number of maps in its group are
            done and if so create a new reduce task
        """
        # incase we dont need to pick any maps
        if len(list_of_completed_maps) == 0:
            return None
        
        # cache xml if not already done
        if job_id not in self.__xml_consts_cache:
            self.__cache_consts(job_id)
            
        dfs_map_filepath = self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]
        dfs_reduce_filepath = self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path]
            
        # lock map flag file
        try:
            # initialize dict
            new_reduce_tasks = {}
            newly_done_maps = {}
            inpt_mts_str = ""
            
            # sort the list of completed maps
            list_of_completed_maps.sort(key=operator.itemgetter(0))
            
            # attempt to lock the map flags file
            if not self.__dist_lock.lock_and_hold(dfs_map_filepath):
                self.__log.warn('Unable to lock file for choosing a map: %s', dfs_map_filepath)
                return None
            
            self.__log.debug('Locked file for marking maps complete: %s', dfs_map_filepath)
            
            # Delete old map flag if still present
            local_map_filepath = self.__xml_consts_cache[job_id][self.__local_map_flags_path]
            if os.path.isfile(local_map_filepath):
                os.remove(local_map_filepath)
            
            # copy map flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(dfs_map_filepath, local_dir_path, job_id):
                self.__log.error('Unable to copy map flags (job_id %d) from DFS', job_id)
                return None
            
            # read the flag file
            mf_fd = open(local_map_filepath, 'r')
            mf_str = mf_fd.read()
            mf_fd.close()
            
            idx = 0
            # iterate through all maps and mark which are not done as done
            for completed_map in list_of_completed_maps:
                map_id = completed_map[0]
                
                # find mt stub through its id
                map_id_str = self.__id_tag + str(map_id) + self.__end_id_tag
                idx = mf_str.find(map_id_str, idx)
                
                # find the done stub
                idx = mf_str.find(self.__dn_tag, idx)
                idx += self.__dn_len
                current_dn_mark = int(mf_str[idx:idx+1])
                
                # if not already marked as done
                if current_dn_mark == 0:
                    self.__log.info('Marking Map id %d for struct %s as done', map_id, completed_map[1])
            
                    # mark as done
                    mf_str = mf_str[:idx] + '1' + mf_str[idx+1:]
                    
                    if not isinstance(completed_map[1], str):
                        self.__log.error('The structure id given is not of string type')
                        raise TypeError('Incorrect type of the input structure id')
                    
                    # append to the list of newly done maps marked as done
                    if completed_map[1] in newly_done_maps:
                        newly_done_maps[completed_map[1]].append(completed_map[0])
                    else:
                        newly_done_maps[completed_map[1]] = [completed_map[0]]
                    
                    #print 'Mark map task complete:', map_id, 'dictionary', newly_done_maps[completed_map[1]] 
                else:
                    self.__log.warning('Some other map worker has already marked map id %d as done', map_id)
            
            maps_inputted = self.__get_maps_inputted(mf_str)
            done_maps, undone_maps, failed_maps = self.__get_map_tasks_statuses(mf_str, self.__xml_consts_cache[job_id][self.__max_map_attempts])
            
            #print 'Maps inputted:', maps_inputted
            #print 'Done maps:', done_maps
            #print 'Undone maps:', undone_maps
            #print 'failed maps:', failed_maps
            
            self.__log.debug('Maps marked as failed: %s', str(failed_maps))
            
            # input ratio
            max_map_to_red_inps = self.__xml_consts_cache[job_id][self.__max_map_to_red_inputs]
            
            #print 'Maps to reduces input ratio:', max_map_to_red_inps
            
            # iterate through all newly done map tasks, and create reduce stubs
            for struct_id in newly_done_maps:
                #print 'Creating reduce stubs for struct id:', struct_id
                
                # get maps that are already input to redcues in this struct
                maps_inputted_struct = maps_inputted.get(struct_id, [])
                
                # get maps that have been done in this struct
                done_maps_struct = done_maps.get(struct_id, [])
                
                #print 'Done maps for this struct:', done_maps_struct
                
                # take out the maps which are done but not input for reduces
                done_maps_not_input = listutils.list_diff(done_maps_struct, maps_inputted_struct)
                # done_maps_not_input = filter(lambda x : x not in maps_inputted_struct, done_maps_struct)
                
                self.__log.debug('Maps waiting to be input to reduce for struct %s: %s', struct_id, str(done_maps_not_input))
                
                # make an empty list, for the current structure
                new_reduce_tasks[struct_id] = []
                
                # build the maximum number of reduces possible
                while len(done_maps_not_input) >= max_map_to_red_inps:
                    # make a new list which would form the set of map id's for the new reduce 
                    new_reduce_inputs = done_maps_not_input[:max_map_to_red_inps]
                    
                    # adjust the remaining done maps not input
                    done_maps_not_input = done_maps_not_input[max_map_to_red_inps:]
                    
                    # add to the new reduce tasks, list of map task ids
                    new_reduce_tasks[struct_id].append(new_reduce_inputs)
                    
                    #print 'Reduce inputs:', new_reduce_tasks
                    
                    self.__log.debug('Reduce being made with map inputs for struct %s: %s', struct_id, str(new_reduce_inputs))
                
                    # append the new inputed tasks to the inpt_mts_tag string
                    for map_id in new_reduce_inputs:
                        if inpt_mts_str != '':
                            inpt_mts_str += ','
                        inpt_mts_str += str(map_id) + ':' + str(struct_id)
                        
                    #print 'Input mts string:', inpt_mts_str
                
                #print 'Remaining map dones:', done_maps_not_input

                # get maps that have been done in this struct
                undone_maps_struct = undone_maps.get(struct_id, [])
                
                # if still more maps remaining and all maps are done
                if len(undone_maps_struct) == 0 and len(done_maps_not_input) > 0:
                    # add to the new reduce tasks, list of map task ids
                    new_reduce_tasks[struct_id].append(done_maps_not_input)

                    self.__log.debug('Reduce being made for input maps for struct %s - no undone remaining: %s', struct_id, str(done_maps_not_input))
                    
                    # append the new inputed tasks to the inpt_mts_tag string
                    for map_id in done_maps_not_input:
                        if inpt_mts_str != '':
                            inpt_mts_str += ','
                        inpt_mts_str += str(map_id) + ':' + str(struct_id)
                        
                    #print 'New reduce tasks after last input:', new_reduce_tasks
                    #print 'Input mts string after last input:', inpt_mts_str
                        
            # if there are new reduce inputs to be made
            if inpt_mts_str != '':
                # attempt to lock the reduce flags file
                if not self.__dist_lock.lock_and_hold(dfs_reduce_filepath):
                    self.__log.warn('Unable to lock file for making new reduce tasks: %s', dfs_reduce_filepath)
                    return None
                
                self.__log.debug('Locked file for making new reduces: %s', dfs_reduce_filepath)
                
                # Delete old map flag if still present
                local_reduce_filepath = self.__xml_consts_cache[job_id][self.__local_reduce_flags_path]
                if os.path.isfile(local_reduce_filepath):
                    os.remove(local_reduce_filepath)
                
                # copy map flag file to local temp dir
                local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path])
                if not self.__hdfs_ref.copy_file_to_local(dfs_reduce_filepath, local_dir_path, job_id):
                    self.__log.error('Unable to copy reduce flags (job_id %d) from DFS', job_id)
                    return None
                
                # read the flag file
                rf_fd = open(local_reduce_filepath, 'r')
                rf_str = rf_fd.read()
                rf_fd.close()
                
                # get the last id
                red_id = self.__get_next_reduce_id(rf_str)
                
                new_rfs = ''
                
                # make reduces for each structural id
                for struct_id in new_reduce_tasks:
                    #print "Struct id for reduce tasks:", struct_id
                    
                    # make the reduce stubs for each reduce task in this structure
                    for reduce_inputs in new_reduce_tasks[struct_id]:
                        self.__log.info('Making Reduce task %d map inputs: %s', red_id, reduce_inputs)
                        
                        # make inputs
                        inputs = []
                        for map_id in reduce_inputs:
                            inputs.append((constants.MAP, map_id))
                        
                        # append to the rest of the flags
                        new_rfs += self.__breaker + DfsFlags.create_reduce_flags_stub(red_id, inputs, 1, struct_id)
                        
                        # increment reduce id
                        red_id += 1
                
                # add new tags to the file
                idx = rf_str.rfind(self.__inpt_rts_tag)
                rf_str = rf_str[:idx] + new_rfs + rf_str[idx:]
                
                # add to the file
                rf_fd = open(local_reduce_filepath, 'w+')
                rf_fd.write(rf_str)
                rf_fd.close()

                self.__log.info('Writing new reduces to the reduce flags')
                
                # delete the old flags file
                if not self.__hdfs_ref.delete_file(dfs_reduce_filepath):
                    self.__log.error('Unable to delete the reduce flags before replacement: %s', dfs_reduce_filepath)
                    return None
                
                dfs_dir_path = os.path.dirname(dfs_reduce_filepath)
                
                if not self.__hdfs_ref.copy_file_from_local(local_reduce_filepath, dfs_dir_path, job_id):
                    self.__log.error('Unable to copy reduce flags back to DFS: %s', dfs_reduce_filepath)
                    return None
                
                # unlock the reduce flags file
                self.__dist_lock.unlock(dfs_reduce_filepath)

                # delete the map flags file from the local directory
                os.remove(local_reduce_filepath)            
            
            # Note the map flags is updated after the reduce flags since any failure in
            #    writing new reduce flags shouldn't be reflected in map flags
            # if there were some new maps done
            if len(newly_done_maps) > 0:
                # if inputs have been assigned, change the map flags
                if inpt_mts_str != '':
                    # add to inpt_mts_tag
                    idx = mf_str.rfind(self.__inpt_mts_tag)
                    idx += self.__inpt_mts_len
                    idx_2 = mf_str.find(self.__end_tag, idx)
                    maps_inptd = mf_str[idx:idx_2]
                    
                    # make the final inptd maps tag
                    if maps_inptd == '':
                        maps_inptd = inpt_mts_str
                    else:
                        maps_inptd = maps_inptd + ',' + inpt_mts_str
                        
                    # adjust in file
                    mf_str = mf_str[:idx] + maps_inptd + mf_str[idx_2:]
                
                # write updated map flag file
                mf_fd = open(local_map_filepath, 'w+')
                mf_fd.write(mf_str)
                mf_fd.close()
                
                self.__log.info('Writing completed maps to the map flags')
                            
                # delete the old flags file
                if not self.__hdfs_ref.delete_file(dfs_map_filepath):
                    self.__log.error('Unable to delete the map flags before replacement: %s', dfs_map_filepath)
                    return None
                
                dfs_dir_path = os.path.dirname(dfs_map_filepath)
                
                if not self.__hdfs_ref.copy_file_from_local(local_map_filepath, dfs_dir_path, job_id):
                    self.__log.error('Unable to copy map flags back to DFS: %s', dfs_map_filepath)
                    return None

            # delete the map flags file from the local directory
            os.remove(local_map_filepath)
            
            return len(undone_maps) == 0

        except Exception as err_msg:
            self.__log.error("Error while marking maps complete of job %d: %s", job_id, err_msg)
            return None
        
        finally:
            # release the lock
            self.__dist_lock.unlock(dfs_map_filepath)
            self.__dist_lock.unlock(dfs_reduce_filepath)
            self.__log.debug("Released locks: %s, %s", dfs_map_filepath, dfs_reduce_filepath)
    
    
    def mark_map_task_fresh(self, job_id, map_id):
        """Mark map task as fresh. Specify job_id and map_id. Return true on 
        success or false on failure
            a. lock the map flags for the job of the chosen map
            b. mark the chosen map as it was fresh ... <pu> to 0, <dn> to 0,
            <ts> to default and maintain the failed value
        """
        # NI
        return True
    
    def delete_map_task_stub(self, job_id, map_id):
        """Delete map task stub. Specify job_id and map_id. Return true on 
        success or false on failure
            a. lock the map flags for the job of the chosen map
            b. remove the stub <mt> from the xml
        """
        # NI
        return True
    
    
    def mark_map_task_failed(self, job_id, map_id):
        """Mark a map task as failed. Specify job_id and map_id. Return true 
        on success or false on failure
            a. lock the map flags for the job of the chosen map
            b. mark the chosen map as failed ... <pu> to 0, <dn> to 0,
            increment <failed>, and set <ts> to default
        """
        # NI
        return True
    
    
    def get_available_reduce_tasks(self, job_id):
        """Get a list of undone reduces. Returns a list of reduce_ids or None on 
        failure
            a. Get the reduce.flag file for the specific job
            b. parse it and get all fresh reduce, and expired but within no. of 
            failed times threshold
        """
        try:
            # cache xml if not already done
            if job_id not in self.__xml_consts_cache:
                self.__cache_consts(job_id)
            
            # initialize dict
            return_reduce_task_list = []
            
            # don't move forward till reduce file blocked
            if not self.__dist_lock.block_till_locked(self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path]):
                self.__log.error('Unable to read the reduce flags (job_id %d) continuously locked', job_id)
                return None
            
            # Delete old reduce flag if still present
            if os.path.isfile(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path]):
                os.remove(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path])
            
            # copy reduce flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path], local_dir_path, job_id):
                self.__log.error('Unable to copy reduce flags (job_id %d) from DFS', job_id)
                return False
            
            # read file into string
            rf_fd = open(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path], 'r')
            rf_str = rf_fd.read()
            rf_fd.close()
            
            idx, flag = self.__get_next_reduce_flag(rf_str)
            while flag != None:
                                
                # only consider task whose failure rate is within limits
                if self.__task_available(flag[1], flag[2], flag[4], \
                                         self.__xml_consts_cache[job_id][self.__max_reduce_attempts], \
                                         flag[7], self.__xml_consts_cache[job_id][self.__reduce_timeout]):
                    return_reduce_task_list.append(flag[0])
                
                # try to get next reduce task
                idx, flag = self.__get_next_reduce_flag(rf_str, idx)
            
            self.__log.debug('Returning the Undone reduce task list: %s', str(return_reduce_task_list))
                   
            # return list of undone map tasks
            return return_reduce_task_list
        
        except Exception as err_msg:
            self.__log.error("Error while getting list of undone map tasks: %s", err_msg)
            return None
    
    
    def choose_reduce_tasks(self, max_no_reduces, job_id):
        """Lock/Choose a reduce task. Specify job_id and number of reduces that 
        need to be chosen. Returns three values: list of tuples (reduce_id, 
        struct_id, level, [inputs ([MR], task_id)]), empty list if none could be 
        locked, and None on failure. a dictionary on success, and None on failure. 
        The list of reduce tasks is a tuple (reduce_id, struct_id, level). 
        This list returned, if the operation is successful (ie if the task were 
        not already marked as done), gives a a reduce task id against every 
        completed structure whose output is final for that structure and should
        be written to the HDFS. The last value is true if the job is complete.
        Remember the last two values only get computed when no reduces could be
        assigned ie the first list is empty 
            a. lock the reduce flags for the chosen reduces
            b. iterate through reduces and see which are available
                if it is change <pu> to 1 and set timestamp
            c. Keep iterating through reduces available until you have found 
                either the number of reduces required or no more reduces are 
                available
            d. write the new reduces flag file
            e. copy back to DFS
            f. unlock the reduce flag
        """
        # incase we dont need to pick any reduces
        if max_no_reduces < 1:
            return None, None, False
        
        # cache xml if not already done
        if job_id not in self.__xml_consts_cache:
            self.__cache_consts(job_id)
        
        dfs_map_filepath = self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]    
        dfs_reduce_filepath = self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path]
        
        # lock reduce flag file
        try:
            # initialize dict
            return_reduce_task_list = []
            reduce_phase_completed = {}
            job_done = False
            
            # don't move forward till map file blocked
            if not self.__dist_lock.block_till_locked(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]):
                self.__log.error('Unable to read the map flags (job_id %d) continuously locked', job_id)
                return None, None, False
            
            # Delete old map flag if still present
            if os.path.isfile(self.__xml_consts_cache[job_id][self.__local_map_flags_path]):
                os.remove(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            
            # copy map flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path], local_dir_path, job_id):
                self.__log.error('Unable to copy map flags (job_id %d) from DFS', job_id)
                return None, None, False
            
            local_map_filepath = self.__xml_consts_cache[job_id][self.__local_map_flags_path]
            # read file into string
            mf_fd = open(local_map_filepath, 'r')
            mf_str = mf_fd.read()
            mf_fd.close()
                
            # attempt to lock the reduce flags file
            if not self.__dist_lock.lock_and_hold(dfs_reduce_filepath):
                self.__log.warn('Unable to lock file for choosing a reduce: %s', dfs_reduce_filepath)
                return None, None, False
            
            self.__log.debug('Locked file for choosing reduce: %s', dfs_reduce_filepath)
            
            # Delete old reduce flag if still present
            local_reduce_filepath = self.__xml_consts_cache[job_id][self.__local_reduce_flags_path]
            if os.path.isfile(local_reduce_filepath):
                os.remove(local_reduce_filepath)
            
            # copy reduce flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(dfs_reduce_filepath, local_dir_path, job_id):
                self.__log.error('Unable to copy reduce flags (job_id %d) from DFS', job_id)
                return None, None, False
            
            rf_fd = open(local_reduce_filepath, 'r')
            rf_str = rf_fd.read()
            rf_fd.close()
            
            idx2 = 0
            idx, flag = self.__get_next_reduce_flag(rf_str)
            # keep iterating over the file until you find enough reduces or reach end of file
            while idx != -1 and len(return_reduce_task_list) < max_no_reduces:
                
                # only consider task whose failure rate is within limits
                if self.__task_available(flag[1], flag[2], flag[4], \
                                         self.__xml_consts_cache[job_id][self.__max_reduce_attempts], \
                                         flag[7], self.__xml_consts_cache[job_id][self.__reduce_timeout]):
                    
                    # mark as picked up
                    idx2 = rf_str.find(self.__pu_tag, idx2)
                    idx2 += self.__pu_len
                    rf_str = rf_str[:idx2] + '1' + rf_str[idx2+1:]
                    
                    idx2 = rf_str.find(self.__ts_tag, idx2)
                    idx2 += self.__ts_len
                    idx3 = rf_str.find(self.__end_tag, idx2)
                    tx = str(time.time())
                    rf_str = rf_str[:idx2] + tx + rf_str[idx3:]
                    
                    # append tuple of reduce id, structural info, level and input
                    return_reduce_task_list.append((flag[0], flag[5], flag[3], flag[6]))
                    
                    # adjust idx according to length differences
                    idx += len(tx) - (idx3 - idx2)
                
                idx2 = idx
                
                # try to get next reduce task
                idx, flag = self.__get_next_reduce_flag(rf_str, idx)
                
            
            # if some reduces got chosen, write back the reduce file
            if len(return_reduce_task_list) > 0:
                # write updated reduce flag file
                rf_fd = open(local_reduce_filepath, 'w+')
                rf_fd.write(rf_str)
                rf_fd.close()
            
                self.__log.info('Writing chosen reduces to the reduce flags')
                
                # delete the old flags file
                if not self.__hdfs_ref.delete_file(dfs_reduce_filepath):
                    self.__log.error('Unable to delete the reduce flags before replacement: %s', dfs_reduce_filepath)
                    return None, None, False
                
                dfs_dir_path = os.path.dirname(dfs_reduce_filepath)
                
                if not self.__hdfs_ref.copy_file_from_local(local_reduce_filepath, dfs_dir_path, job_id):
                    self.__log.error('Unable to copy reduce flags back to DFS: %s', dfs_reduce_filepath)
                    return None, None, False
            else:
                # find which structs have ended
                
                reduces_inputted = self.__get_reduces_inputted(rf_str)
                done_reduces, undone_reduces, failed_reduces = self.__get_reduce_tasks_statuses(rf_str, self.__xml_consts_cache[job_id][self.__max_reduce_attempts])
                    
                maps_inputted = self.__get_maps_inputted(mf_str)
                done_maps, undone_maps, failed_maps = self.__get_map_tasks_statuses(mf_str, self.__xml_consts_cache[job_id][self.__max_map_attempts])
                
                reduce_phase_completed, structs_remaining = self.__structs_finished(maps_inputted, done_maps, undone_maps, reduces_inputted, done_reduces, undone_reduces)
                
                # job is done when no struct needs to be computed on anymore
                job_done = len(structs_remaining) == 0
                
            # delete the reduce flags file from the local directory
            os.remove(local_map_filepath)
                
            # delete the reduce flags file from the local directory
            os.remove(local_reduce_filepath)
            
            # print return_reduce_task_list
            self.__log.info("Returning chosen reduces: %s", str(return_reduce_task_list))
            return return_reduce_task_list, reduce_phase_completed, job_done
        
        except Exception as err_msg:
            self.__log.error("Error while choosing %d reduces of job %d: %s", max_no_reduces, job_id, err_msg)
            return None, None, False
        
        finally:
            # release the lock
            self.__dist_lock.unlock(dfs_reduce_filepath)
            self.__dist_lock.unlock(dfs_map_filepath)
            self.__log.debug("Released lock: %s, %s", dfs_reduce_filepath, dfs_map_filepath)

        
    def mark_reduce_task_fresh(self, job_id, reduce_id):
        """Mark reduce task as fresh. Specify job_id and return_id. Return true 
        on success or false on failure
            a. lock the reduce flags for the job of the chosen reduce
            b. mark the chosen reduce as it was fresh ... <pu> to 0, <dn> to 0,
            <ts> to default and maintain the failed value
        """
        # NI
        return True
    
    def delete_reduce_task_stub(self, job_id, reduce_id):
        """Delete reduce task stub. Specify job_id and map_id. Return true on 
        success or false on failure
            a. lock the reduce flags for the job of the chosen reduce
            b. remove the stub <rt> from the xml
        """
        # NI
        return True


    def mark_reduces_complete(self, list_of_completed_reduces, job_id):
        """Mark reduce tasks as complete. Specify job_id and the list of reduce
        tasks. Returns True if the reduces were successfully marked  
            a. lock and fetch the reduce.flag for the particular job
            b. It will mark <dn> to 1 in the flag for the particular reduce.
                If already marked as done, ignore that reduce
            c. It will check if a certain number of reduces in its group are
            done and if so create a new reduce task
        """
        # incase we dont need to pick any reduces
        if len(list_of_completed_reduces) == 0:
            return None
        
        # cache xml if not already done
        if job_id not in self.__xml_consts_cache:
            self.__cache_consts(job_id)
        
        dfs_map_filepath = self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]    
        dfs_reduce_filepath = self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path]
            
        # lock reduce flag file
        try:
            # initialize dict
            new_reduce_tasks = {}
            newly_done_reduces = {}
            inpt_rts_str = ""
            # don't move forward till map file blocked
            if not self.__dist_lock.block_till_locked(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]):
                self.__log.error('Unable to read the map flags (job_id %d) continuously locked', job_id)
                return None
            
            # Delete old map flag if still present
            if os.path.isfile(self.__xml_consts_cache[job_id][self.__local_map_flags_path]):
                os.remove(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            
            # copy map flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_map_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(self.__xml_consts_cache[job_id][self.__dfs_map_flags_path], local_dir_path, job_id):
                self.__log.error('Unable to copy map flags (job_id %d) from DFS', job_id)
                return False
            
            local_map_filepath = self.__xml_consts_cache[job_id][self.__local_map_flags_path]
            # read file into string
            mf_fd = open(local_map_filepath, 'r')
            mf_str = mf_fd.read()
            mf_fd.close()
                            
            # sort the list of completed reduces
            list_of_completed_reduces.sort(key=operator.itemgetter(0))
            
            # attempt to lock the reduce flags file
            if not self.__dist_lock.lock_and_hold(dfs_reduce_filepath):
                self.__log.warn('Unable to lock file for choosing a reduce: %s', dfs_reduce_filepath)
                return None
            
            self.__log.debug('Locked file for marking reduces complete: %s', dfs_reduce_filepath)
            
            # Delete old reduce flag if still present
            local_reduce_filepath = self.__xml_consts_cache[job_id][self.__local_reduce_flags_path]
            if os.path.isfile(local_reduce_filepath):
                os.remove(local_reduce_filepath)
            
            # copy reduce flag file to local temp dir
            local_dir_path = os.path.dirname(self.__xml_consts_cache[job_id][self.__local_reduce_flags_path])
            if not self.__hdfs_ref.copy_file_to_local(dfs_reduce_filepath, local_dir_path, job_id):
                self.__log.error('Unable to copy reduce flags (job_id %d) from DFS', job_id)
                return None
            
            # read the flag file
            rf_fd = open(local_reduce_filepath, 'r')
            rf_str = rf_fd.read()
            rf_fd.close()
            
            idx = 0
            # iterate through all reduces and mark which are not done as done
            for completed_reduce in list_of_completed_reduces:
                reduce_id = completed_reduce[0]
                
                # find rt stub through its id
                reduce_id_str = self.__id_tag + str(reduce_id) + self.__end_id_tag
                idx = rf_str.find(reduce_id_str, idx)
                
                # find the done stub
                idx = rf_str.find(self.__dn_tag, idx)
                idx += self.__dn_len
                current_dn_mark = int(rf_str[idx:idx+1])
                
                # if not already marked as done
                if current_dn_mark == 0:
                    self.__log.info('Marking Reduce id %d for struct %s as done', reduce_id, completed_reduce[1])
            
                    # mark as done
                    rf_str = rf_str[:idx] + '1' + rf_str[idx+1:]
                    
                    if not isinstance(completed_reduce[1], str):
                        self.__log.error('The structure id given is not of string type')
                        raise TypeError('Incorrect type of the input structure id')
                    
                    # append to the list of newly done reduces marked as done
                    if completed_reduce[1] in newly_done_reduces:
                        newly_done_reduces[completed_reduce[1]].append(completed_reduce[0])
                    else:
                        newly_done_reduces[completed_reduce[1]] = [completed_reduce[0]]
                    
                    #print 'Mark reduce task complete:', reduce_id, 'dictionary', newly_done_reduces[completed_reduce[1]] 
                else:
                    self.__log.warning('Some other reduce worker has already marked reduce id %d as done', reduce_id)
            
            # get input reduces
            reduces_inputted = self.__get_reduces_inputted(rf_str)
            done_reduces, undone_reduces, failed_reduces = self.__get_reduce_tasks_statuses(rf_str, self.__xml_consts_cache[job_id][self.__max_reduce_attempts])
            
            #print 'Reduces inputted:', reduces_inputted
            #print 'Done reduces:', done_reduces
            #print 'Undone reduces:', undone_reduces
            #print 'failed reduces:', failed_reduces
            
            self.__log.debug('Reduces marked as failed: %s', str(failed_reduces))
            
            # input ratio
            max_red_to_red_inps = self.__xml_consts_cache[job_id][self.__max_red_to_red_inputs]
            
            #print 'Reduces to reduces input ratio:', max_red_to_red_inps

            # if there are some newly done reduces, get maps undone
            if len(newly_done_reduces) > 0:
                
                done_maps, undone_maps, failed_maps = self.__get_map_tasks_statuses(mf_str, self.__xml_consts_cache[job_id][self.__max_map_attempts])
                                
                        
            # iterate through all newly done reduce tasks, and create reduce stubs
            for struct_id in newly_done_reduces:
                #print 'Creating reduce stubs for struct id:', struct_id
                
                # get reduces that are already input to reduces in this struct
                reduces_inputted_struct = reduces_inputted.get(struct_id, [])
                
                # get reduces that have been done in this struct
                done_reduces_struct = done_reduces.get(struct_id, [])
                
                #print 'Done reduces for this struct:', done_reduces_struct
                
                # take out the reduces which are done but not input to other reduces
                done_reduces_not_input = [x for x in done_reduces_struct if x[0] not in reduces_inputted_struct]
                
                self.__log.debug('Reduces waiting to be input to other reduces for struct %s: %s', struct_id, str(done_reduces_not_input))
                
                # make an empty list, for the current structure
                new_reduce_tasks[struct_id] = []
                
                # build the maximum number of reduces possible
                while len(done_reduces_not_input) >= max_red_to_red_inps:
                    # make a new list which would form the set of reduce id's for the new reduce 
                    new_reduce_inputs = done_reduces_not_input[:max_red_to_red_inps]
                    
                    # adjust the remaining done reduces not input
                    done_reduces_not_input = done_reduces_not_input[max_red_to_red_inps:]
                    
                    # add to the new reduce tasks, list of reduce task ids and reduce levels
                    new_reduce_tasks[struct_id].append(new_reduce_inputs)
                    
                    #print 'Reduce inputs:', new_reduce_tasks
                    
                    self.__log.debug('Reduce being made with reduce inputs for struct %s: %s', struct_id, str(new_reduce_inputs))
                
                    # append the new inputed tasks to the inpt_rts_tag string
                    for reduce_info in new_reduce_inputs:
                        if inpt_rts_str != '':
                            inpt_rts_str += ','
                        inpt_rts_str += str(reduce_info[0]) + ':' + str(struct_id)
                        
                    #print 'Input rts string:', inpt_rts_str
                
                #print 'Remaining reduce dones:', done_reduces_not_input
                
                # get reduces that have been not done in this struct
                undone_reduces_struct = undone_reduces.get(struct_id, [])
                
                # get maps that have been not done in this struct
                undone_maps_struct = undone_maps.get(struct_id, [])
                
                #print 'done_reduces_not_input -', done_reduces_not_input
                #print 'Struct id', struct_id, '- undone_reduces_struct -', undone_reduces_struct, '- undone_maps_struct -', undone_maps_struct 
                
                # if still more than 1 reduces remaining and all maps and reduces of
                # particular struct are done
                if len(done_reduces_not_input) > 1 and len(undone_reduces_struct) == 0 and len(undone_maps_struct) == 0:
                    # add to the new reduce tasks, list of reduce task ids
                    new_reduce_tasks[struct_id].append(done_reduces_not_input)

                    self.__log.debug('Reduce being made for input reduces for struct %s - no undone remaining: %s', struct_id, str(done_reduces_not_input))
                    
                    # append the new inputed tasks to the inpt_rts_tag string
                    for reduce_info in done_reduces_not_input:
                        if inpt_rts_str != '':
                            inpt_rts_str += ','
                        inpt_rts_str += str(reduce_info[0]) + ':' + str(struct_id)
                        
                    #print 'New reduce tasks after last input:', new_reduce_tasks
                    #print 'Input rts string after last input:', inpt_rts_str
                         
            # if there are new reduce tasks to be made
            if inpt_rts_str != '' or len(new_reduce_tasks) > 0:
                # get the last id
                red_id = self.__get_next_reduce_id(rf_str)
                
                new_rfs = ''
                
                # make reduces for each structural id
                for struct_id in new_reduce_tasks:
                    #print "Struct id for reduce tasks:", struct_id
                    
                    # make the reduce stubs for each reduce task in this structure
                    for reduce_inputs in new_reduce_tasks[struct_id]:
                        self.__log.info('Making Reduce task %d reduce inputs: %s', red_id, reduce_inputs)
                        
                        # make inputs
                        max_level = 0
                        inputs = []
                        for reduce_info in reduce_inputs:
                            inputs.append((constants.REDUCE, reduce_info[0]))
                            # calculate what is the maximum level of reduces in the input
                            if reduce_info[1] > max_level:
                                max_level = reduce_info[1]
                        
                        # append to the rest of the flags
                        new_rfs += self.__breaker + DfsFlags.create_reduce_flags_stub(red_id, inputs, max_level+1, struct_id)
                        
                        # increment reduce id
                        red_id += 1
                
                # add new tags to the file
                idx = rf_str.rfind(self.__inpt_rts_tag)
                rf_str = rf_str[:idx] + new_rfs + rf_str[idx:]
            
                # if there are reduces being set as inputs 
                # add to inpt_rts_tag
                if inpt_rts_str != '':
                    # find the inputted section in the reduce flag
                    idx = rf_str.rfind(self.__inpt_rts_tag)
                    idx += self.__inpt_rts_len
                    idx_2 = rf_str.find(self.__end_tag, idx)
                    reduces_inptd = rf_str[idx:idx_2]
                    
                    # make the final inptd reduces tag
                    if reduces_inptd == '':
                        reduces_inptd = inpt_rts_str
                    else:
                        reduces_inptd = reduces_inptd + ',' + inpt_rts_str
                        
                    # adjust in file
                    rf_str = rf_str[:idx] + reduces_inptd + rf_str[idx_2:]
    
            
            # if there were some new reduces done or 
            # if inputs have been assigned, change the reduce flags            
            if len(newly_done_reduces) > 0 or inpt_rts_str != '' or len(new_reduce_tasks) > 0:
                
                # add to the file
                rf_fd = open(local_reduce_filepath, 'w+')
                rf_fd.write(rf_str)
                rf_fd.close()

                self.__log.info('Writing new reduces to the reduce flags')
                
                # delete the old flags file
                if not self.__hdfs_ref.delete_file(dfs_reduce_filepath):
                    self.__log.error('Unable to delete the reduce flags before replacement: %s', dfs_reduce_filepath)
                    return None
                
                dfs_dir_path = os.path.dirname(dfs_reduce_filepath)
                
                if not self.__hdfs_ref.copy_file_from_local(local_reduce_filepath, dfs_dir_path, job_id):
                    self.__log.error('Unable to copy reduce flags back to DFS: %s', dfs_reduce_filepath)
                    return None
            
            
            # delete the reduce flags file from the local directory
            os.remove(local_map_filepath)     

            # delete the reduce flags file from the local directory
            os.remove(local_reduce_filepath)            
            
            return True

        except Exception as err_msg:
            self.__log.error("Error while marking reduces complete of job %d: %s", job_id, err_msg)
            return None
        
        finally:
            # release the lock
            self.__dist_lock.unlock(dfs_reduce_filepath)
            self.__log.debug("Released locks: %s", dfs_reduce_filepath)


    def mark_reduce_task_failed(self, job_id, reduce_id):
        """Mark a reduce task as failed. Specify job_id and reduce_id. Return 
        true on success or false on failure
            a. lock the reduce flags for the job of the chosen reduce
            b. mark the chosen reduce as failed ... <pu> to 0, <dn> to 0,
            increment <failed>, and set <ts> to default
        """
        return True
    

    @staticmethod
    def create_map_flags_stub(map_task_id, inputs, struct_info_tag=None):
        """This function creates a string for a stub of map. The inputs
        are in a list of (filename, offset, size). The struct id gives the
        structural id of the task
        """
        inputs_str = ""
        
        if struct_info_tag == None:
            struct_info_tag = constants.STRUCT_ID_DEFAULT 
            
        # make the inputs string
        for input in inputs:
            inputs_str += DfsFlags.__map_flag_input % input
            
        # make the whole map flag stub string
        map_flag_str = DfsFlags.__map_flag_stub % (int(map_task_id), DfsFlags.__default_pu, DfsFlags.__default_dn, DfsFlags.__default_fl, struct_info_tag, inputs_str, DfsFlags.__default_ts)
        
        return map_flag_str
    
    
    @staticmethod
    def create_reduce_flags_stub(reduce_task_id, inputs, level, struct_info_tag=None):
        """This function creates a string for a default stub for reduce
        """
        inputs_str = ""
        
        if struct_info_tag == None:
            struct_info_tag = constants.STRUCT_ID_DEFAULT 
        
        for input in inputs:
            if inputs_str != "":
                inputs_str += ','
            inputs_str += input[0] + str(input[1])
        
        # make the whole map flag stub string
        reduce_flag_str = DfsFlags.__reduce_flag_stub % (int(reduce_task_id), DfsFlags.__default_pu, DfsFlags.__default_dn, level, DfsFlags.__default_fl, struct_info_tag, inputs_str, DfsFlags.__default_ts)
        
        return reduce_flag_str
    
    
    def __structs_finished(self, maps_inputted, done_maps, undone_maps, reduces_inputted, done_reduces, undone_reduces):
        """This function returns a list of structs which have ended map
        and reduce phase. The first dictionary passed shows the reduce
        structs who have completed their work, and the value shows the 
        last reduce holding the final output for that struct. The second
        value passed back gives you a list of structs whose computation
        is remaining
        """
        reduce_phase_completed = {}
        structs_remaining = []
        
        # iterate through all the done structs in done_maps
        for struct_id in done_maps:
            # check if all no maps are undone and all tasks done have been inputted
            if len(undone_maps.get(struct_id, [])) == 0 and \
                len(maps_inputted.get(struct_id, [])) == len(done_maps.get(struct_id, [])):
                    # now we have structs which have been completely shuffled to reduce phase
                    # get the last reduce task id whose output should be written to the HDFS
                    reduces_inputted_struct = reduces_inputted.get(struct_id, [])
                    done_reduces_struct = done_reduces.get(struct_id, [])
                    # check if no reduces are undone
                    if len(undone_reduces.get(struct_id, [])) == 0:
                        
                        # check if only one done reduce is not inputted
                        found = False
                        lst = []
                        # get all reduces done
                        for dr_task in done_reduces_struct:
                            lst.append(dr_task[0])
                        
                        # remove all reduces inputted
                        for ri_task in reduces_inputted_struct:
                            lst.remove(ri_task)
                        
                        if len(lst) == 1:
                            reduce_phase_completed[struct_id] = lst[0]
            
            # append struct id's which weren't done (NOTE: but whose 1 map atleast was done)
            if struct_id not in reduce_phase_completed:
                structs_remaining.append(struct_id)
        
        # append to the remaining list whose maps are not done at all
        for struct_id in undone_maps:
            if struct_id not in done_maps:
                structs_remaining.append(struct_id)
                            
        return reduce_phase_completed, structs_remaining
        
    
    def __task_available(self, picked_up, done, failed, max_failures, timestamp, expiry_period):
        """This function returns if the file is still available for pickup
        given the tags: picked up, done, no of times it has already failed,
        the max times failures are allowed, the picked up timestamp, and the
        expiry period
        """
        
        # check if the failures are less than max
        if failed < max_failures:
            # if the task hasn't been picked up and not done
            if picked_up == 0 and done == 0:
                return True
            elif done == 0 and timestamp + expiry_period < time.time():
                print("---- TASK EXPIRED ----", timestamp + expiry_period, time.time(), expiry_period)
                return True
        else:
            print("---- TASK OVER-FAILED ----")
        
        return False
            
    
    def __get_maps_inputted(self, map_flags):
        """This function returns a dictionary of lists where each key belongs
        to a structure id and the list is of id of tasks which are done and 
        then assigned to a reduce worker, and the task's structure id. This 
        data is pulled from <inptd_mts> section in the map flags
        """
        try:
            return_dict = {}
            
            idx = map_flags.rfind(self.__inpt_mts_tag)
            idx += self.__inpt_mts_len
            idx_2 = map_flags.find(self.__end_tag, idx)
            maps_inptd = map_flags[idx:idx_2]
            
            if maps_inptd == '':
                return {}
            
            for i in maps_inptd.split(','):
                x = i.split(':')
                if x[1] in return_dict:
                    return_dict[x[1]].append(int(x[0]))
                else:
                    return_dict[x[1]] = [ int(x[0]) ]
            
            return return_dict
        except Exception as err_msg:
            self.__log.error("Error occurred while getting list of inputted maps: %s", err_msg)
    
    
    def __get_reduces_inputted(self, reduce_flags):
        """This function returns a dictionary of lists where each key belongs
        to a structure id and the list is of id of tasks which are done and 
        then assigned to another reduce worker, and the task's structure id.
        This data is pulled from <inptd_rts> section in the reduce flags
        """
        try:
            return_dict = {}
            
            idx = reduce_flags.rfind(self.__inpt_rts_tag)
            idx += self.__inpt_rts_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            redcs_inptd = reduce_flags[idx:idx_2]

            #print 'Reduces inputted SECTION:', redcs_inptd
            
            if redcs_inptd == '':
                return {}
                        
            for i in redcs_inptd.split(','):
                x = i.split(':')
                
                #print 'Each Reduce inputted SECTION:', x
                if x[1] in return_dict:
                    return_dict[x[1]].append(int(x[0]))
                else:
                    return_dict[x[1]] = [ int(x[0]) ]
                    
            return return_dict
        except Exception as err_msg:
            self.__log.error("Error occurred while getting list of inputted reduces: %s", err_msg)
    
    
    def __get_next_map_id(self, map_flags):
        """This function returns the id that should be assigned to the
        next map. The input is a whole string of map flags file
        """
        try:
            idx = map_flags.rfind(self.__id_tag)
            # if not found
            if idx == -1:
                return self.__first_map_id
            idx += self.__id_len
            idx_2 = map_flags.find(self.__end_tag, idx)
            return int(map_flags[idx:idx_2]) + 1
        except Exception as err_msg:
            self.__log.error("Error while getting next map flag id: %s", err_msg)
            

    def __get_next_reduce_id(self, reduce_flags):
        """This function returns the id that should be assigned to the
        next reduce. The input is a whole string of reduce flags file
        """
        try:
            idx = reduce_flags.rfind(self.__id_tag)
            # if not found
            if idx == -1:
                return self.__first_reduce_id
            idx += self.__id_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            return int(reduce_flags[idx:idx_2]) + 1
        except Exception as err_msg:
            self.__log.error("Error while getting next reduce flag id: %s", err_msg)

    
    def __get_map_tasks_statuses(self, map_flags, max_allowed_map_failures):
        """This function iterates through all the flags, and returns three
        dictionaries -> 1. done maps, 2. undone maps, 3. failed maps where 
        each map ids are keyed by structural id
        """
        try:
            failed_maps = {}
            undone_maps = {}
            done_maps = {}
            
            idx = 0
            # get next map id
            idx = map_flags.find(self.__id_tag, idx)
            
            while idx != -1:
                idx += self.__id_len
                idx_2 = map_flags.find(self.__end_tag, idx)
                flag = [ int(map_flags[idx:idx_2]) ]
            
                # get done
                idx = map_flags.find(self.__dn_tag, idx)
                idx += self.__dn_len
                flag.append( int(map_flags[idx:idx+1]) )
            
                # get number of times failed
                idx = map_flags.find(self.__fl_tag, idx)
                idx += self.__fl_len
                idx_2 = map_flags.find(self.__end_tag, idx)
                flag.append( int(map_flags[idx:idx_2]) )
                
                # get structural info tag
                idx = map_flags.find(self.__strct_tag, idx_2)
                idx += self.__strct_len
                idx_2 = map_flags.find(self.__end_tag, idx)
                flag.append( map_flags[idx:idx_2] )
                
                # if done
                if flag[1] == 1:
                    if flag[3] in done_maps:
                        done_maps[flag[3]].append(flag[0])
                    else:
                        done_maps[flag[3]] = [ flag[0] ]
                elif flag[2] < max_allowed_map_failures:
                    # if undone
                    if flag[3] in undone_maps:
                        undone_maps[flag[3]].append(flag[0])
                    else:
                        undone_maps[flag[3]] = [ flag[0] ]
                else:
                    # if failed      
                    if flag[3] in failed_maps:
                        failed_maps[flag[3]].append(flag[0])
                    else:
                        failed_maps[flag[3]] = [ flag[0] ]
                                        
                # get next map id
                idx = map_flags.find(self.__id_tag, idx)
            
            # return the built up dictionaries
            return done_maps, undone_maps, failed_maps
             
        except Exception as err_msg:
            self.__log.error("Error while analyzing statuses of all maps: %s", err_msg)
    
    
    def __get_reduce_tasks_statuses(self, reduce_flags, max_allowed_reduce_failures):
        """This function iterates through all the flags, and returns three
        dictionaries -> 1. done reduces, 2. undone reduces, 3. failed reduces
        where each (reduce_ids, level) tuple is keyed by structural id
        """
        try:
            failed_reduces = {}
            undone_reduces = {}
            done_reduces = {}
            
            idx = 0
            # get next map id
            idx = reduce_flags.find(self.__id_tag, idx)
            
            while idx != -1:
                idx += self.__id_len
                idx_2 = reduce_flags.find(self.__end_tag, idx)
                flag = [ int(reduce_flags[idx:idx_2]) ]
            
                # get done
                idx = reduce_flags.find(self.__dn_tag, idx)
                idx += self.__dn_len
                flag.append( int(reduce_flags[idx:idx+1]) )

                # get level
                idx = reduce_flags.find(self.__lvl_tag, idx)
                idx += self.__lvl_len
                idx_2 = reduce_flags.find(self.__end_tag, idx)
                flag.append( int(reduce_flags[idx:idx_2]) )
                
                # get number of times failed
                idx = reduce_flags.find(self.__fl_tag, idx)
                idx += self.__fl_len
                idx_2 = reduce_flags.find(self.__end_tag, idx)
                flag.append( int(reduce_flags[idx:idx_2]) )
                
                # get structural info tag
                idx = reduce_flags.find(self.__strct_tag, idx_2)
                idx += self.__strct_len
                idx_2 = reduce_flags.find(self.__end_tag, idx)
                flag.append( reduce_flags[idx:idx_2] )
                
                # if done
                if flag[1] == 1:
                    if flag[4] in done_reduces:
                        done_reduces[flag[4]].append( (flag[0], flag[2]) )
                    else:
                        done_reduces[flag[4]] = [ (flag[0], flag[2]) ]
                elif flag[3] < max_allowed_reduce_failures:
                    # if undone
                    if flag[4] in undone_reduces:
                        undone_reduces[flag[4]].append( (flag[0], flag[2]) )
                    else:
                        undone_reduces[flag[4]] = [ (flag[0], flag[2]) ]
                else:
                    # if failed      
                    if flag[4] in failed_reduces:
                        failed_reduces[flag[4]].append( (flag[0], flag[2]) )
                    else:
                        failed_reduces[flag[4]] = [ (flag[0], flag[2]) ]
                                        
                # get next reduce id
                idx = reduce_flags.find(self.__id_tag, idx)
            
            # return the built up dictionaries
            return done_reduces, undone_reduces, failed_reduces
             
        except Exception as err_msg:
            self.__log.error("Error while analyzing statuses of all maps: %s", err_msg)
    
    
    def __get_next_map_flag(self, map_flags, start_indx=0):
        """This function gets the next map flag, searching from the start
        index. It returns index to where the found map flag ends and a list
        of values [id, picked_up, done, failed, structural info, 
        [tuples of input -> (filename, offset, bytes to read)], timestamp]
        """
        try:
            # find start of next flag
            idx = map_flags.find(self.__id_tag, start_indx)
            
            # if no map flag found
            if idx == -1:
                return idx, None
            
            # move iterator forward
            idx += self.__id_len
            
            # find end of id
            idx_2 = map_flags.find(self.__end_tag, idx)
            
            # make a list starting with the id
            flag = [ int(map_flags[idx:idx_2]) ]
            
            # get picked up
            idx = map_flags.find(self.__pu_tag, idx_2)
            idx += self.__pu_len
            flag.append( int(map_flags[idx:idx+1]) )
            
            # get done
            idx = map_flags.find(self.__dn_tag, idx)
            idx += self.__dn_len
            flag.append( int(map_flags[idx:idx+1]) )
            
            # get number of times failed
            idx = map_flags.find(self.__fl_tag, idx)
            idx += self.__fl_len
            idx_2 = map_flags.find(self.__end_tag, idx)
            flag.append( int(map_flags[idx:idx_2]) )
            
            # get structural info tag
            idx = map_flags.find(self.__strct_tag, idx_2)
            idx += self.__strct_len
            idx_2 = map_flags.find(self.__end_tag, idx)
            flag.append( map_flags[idx:idx_2] )
            
            # get input data
            idx = map_flags.find(self.__inp_tag, idx_2)
            idx += self.__inp_len
            idx_2 = map_flags.find(self.__end_inp_tag, idx)
            input_section = map_flags[idx:idx_2]
            
            # get all input sections
            inputs = []
            idxy = input_section.find(self.__in_tag)
            while idxy != -1:
                # get one input
                idxy += self.__in_len
            
                idxy = input_section.find(self.__fn_tag, idxy)
                idxy += self.__fn_len
                idxy_2 = input_section.find(self.__end_tag, idxy)
                filename = input_section[idxy:idxy_2]
                
                idxy = input_section.find(self.__offs_tag, idxy_2)
                idxy += self.__offs_len
                idxy_2 = input_section.find(self.__end_tag, idxy)
                offset = int(input_section[idxy:idxy_2])
                
                idxy = input_section.find(self.__sz_tag, idxy_2)
                idxy += self.__sz_len
                idxy_2 = input_section.find(self.__end_tag, idxy)
                size = int(input_section[idxy:idxy_2])
                
                # make input tuple
                input = (filename, offset, size)
                
                # append to list
                inputs.append(input)
                
                idxy = input_section.find(self.__in_tag, idxy_2)
            
            # append inputs to flag
            flag.append(inputs)
            
            # get timestamp data
            idx = map_flags.find(self.__ts_tag, idx_2)
            idx += self.__ts_len
            idx_2 = map_flags.find(self.__end_tag, idx)
            flag.append( float(map_flags[idx:idx_2]) )
            
            idx = map_flags.find(self.__end_mt_tag, idx_2)
            idx += self.__end_mt_len
            
            #self.__log.debug('Map flag retrieved: %s', flag)
            return idx, flag
            
        except Exception as err_msg:
            self.__log.error("Error occurred while parsing a map stub: %s", err_msg)
    
    
    def __get_next_reduce_flag(self, reduce_flags, start_indx=0):
        """This function gets the next reduce flag, searching from the start
        index. It returns index to where the found reduce flag ends and a list
        of values [id, picked_up, done, red. level, failed, structural info, 
        [tuples of input -> ([MR], task_id)], timestamp]
        """
        try:
            # find start of next flag
            idx = reduce_flags.find(self.__id_tag, start_indx)
            
            # if no reduce flag found
            if idx == -1:
                return idx, None
            
            # move iterator forward
            idx += self.__id_len
            
            # find end of id
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            
            # make a list starting with the id
            flag = [ int(reduce_flags[idx:idx_2]) ]
            
            # get picked up
            idx = reduce_flags.find(self.__pu_tag, idx_2)
            idx += self.__pu_len
            flag.append( int(reduce_flags[idx:idx+1]) )
            
            # get done
            idx = reduce_flags.find(self.__dn_tag, idx)
            idx += self.__dn_len
            flag.append( int(reduce_flags[idx:idx+1]) )
            
            # get level
            idx = reduce_flags.find(self.__lvl_tag, idx)
            idx += self.__lvl_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            flag.append( int(reduce_flags[idx:idx_2]) )
            
            # get number of times failed
            idx = reduce_flags.find(self.__fl_tag, idx_2)
            idx += self.__fl_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            flag.append( int(reduce_flags[idx:idx_2]) )
            
            # get structural info tag
            idx = reduce_flags.find(self.__strct_tag, idx_2)
            idx += self.__strct_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            flag.append( reduce_flags[idx:idx_2] )
            
            # get input data
            idx = reduce_flags.find(self.__inp_tag, idx_2)
            idx += self.__inp_len
            idx_2 = reduce_flags.find(self.__end_inp_tag, idx)
            input_section = reduce_flags[idx:idx_2]
            
            # get all input sections
            inputs = []
            input_section = input_section.split(',')
            for input in input_section:
                # get one input
                if input[0] != constants.MAP and input[0] != constants.REDUCE: 
                    self.__log.error('The tag given for map or reduce is not correct for reduce id %d', flag[0])
                
                # append inputs to the list
                inputs.append( (input[0], int(input[1:])) )
            
            # append inputs to flag
            flag.append(inputs)
            
            # get timestamp data
            idx = reduce_flags.find(self.__ts_tag, idx_2)
            idx += self.__ts_len
            idx_2 = reduce_flags.find(self.__end_tag, idx)
            flag.append( float(reduce_flags[idx:idx_2]) )
            
            idx = reduce_flags.find(self.__end_rt_tag, idx_2)
            idx += self.__end_rt_len
            
            #self.__log.debug('Reduce flag retrieved: %s', flag)
            return idx, flag
            
        except Exception as err_msg:
            self.__log.error("Error occurred while parsing a reduce stub: %s", err_msg)
    
    
    def __cache_consts(self, job_id):
        """This function is used to cache all flags paths from the XML 
        in a dictionary keyed by job_id with a tuple of:
        """
        try:
            a = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_flags, job_id)
            b = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_map_flags, job_id)
            c = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_reduce_flags, job_id)
            d = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_job_flags_filepath, job_id)
            e = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_map_flags_filepath, job_id)
            f = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_reduce_flags_filepath, job_id)
            g = ma.const.JobsXmlData.get_float_data(ma.const.xml_map_timeout_period, job_id)
            h = ma.const.JobsXmlData.get_float_data(ma.const.xml_reduce_timeout_period, job_id)
            i = ma.const.JobsXmlData.get_int_data(ma.const.xml_max_map_attempts, job_id)
            j = ma.const.JobsXmlData.get_int_data(ma.const.xml_max_reduce_attempts, job_id)
            k = ma.const.JobsXmlData.get_int_data(ma.const.xml_map_reduce_ratio, job_id)
            l = ma.const.JobsXmlData.get_int_data(ma.const.xml_reduce_input_ratio, job_id)
             
            self.__xml_consts_cache[job_id] = {}
            self.__xml_consts_cache[job_id][self.__dfs_job_flags_path] = a
            self.__xml_consts_cache[job_id][self.__dfs_map_flags_path] = b
            self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path] = c
            self.__xml_consts_cache[job_id][self.__local_job_flags_path] = d
            self.__xml_consts_cache[job_id][self.__local_map_flags_path] = e
            self.__xml_consts_cache[job_id][self.__local_reduce_flags_path] = f
            self.__xml_consts_cache[job_id][self.__map_timeout] = g
            self.__xml_consts_cache[job_id][self.__reduce_timeout] = h
            self.__xml_consts_cache[job_id][self.__max_map_attempts] = i
            self.__xml_consts_cache[job_id][self.__max_reduce_attempts] = j
            self.__xml_consts_cache[job_id][self.__max_map_to_red_inputs] = k
            self.__xml_consts_cache[job_id][self.__max_red_to_red_inputs] = l

            #print 'Job flags path:', self.__xml_consts_cache[job_id][self.__dfs_job_flags_path]
            #print 'Map flags path:', self.__xml_consts_cache[job_id][self.__dfs_map_flags_path]
            #print 'Reduce flags path:', self.__xml_consts_cache[job_id][self.__dfs_reduce_flags_path]
            #print 'local job flags path:', self.__xml_consts_cache[job_id][self.__local_job_flags_path]
            #print 'local map flags path:', self.__xml_consts_cache[job_id][self.__local_map_flags_path]
            #print 'local reduce flags path:', self.__xml_consts_cache[job_id][self.__local_reduce_flags_path]
            #print 'map timeout:', self.__xml_consts_cache[job_id][self.__map_timeout]
            #print 'reduce timeout:', self.__xml_consts_cache[job_id][self.__reduce_timeout]
            #print 'max map attempts:',self.__xml_consts_cache[job_id][self.__max_map_attempts]
            #print 'max reduce attempts:', self.__xml_consts_cache[job_id][self.__max_reduce_attempts]
            #print 'max map to reduce inputs:', self.__xml_consts_cache[job_id][self.__max_map_to_red_inputs]
            #print 'max reduce to reduce inputs:', self.__xml_consts_cache[job_id][self.__max_red_to_red_inputs]
            self.__log.debug("Cached const paths of job id- %d", job_id)
        except Exception as err_msg:
            self.__log.error("Error while caching consts of DFS paths: %s", err_msg)


if __name__ == '__main__':
    #main for testing
    job_id = 2
    import ma.fs.dfs._hdfs as _hdfs
    import random
    hdfs = _hdfs.HDFS()
    print('/////////////////////////////////////////////////')
    dfl = DfsFlags(hdfs)
    print('/////////////////////////////////////////////////')
    print(DfsFlags.create_map_flags_stub(0, [('file1.out',0,1000),('file2.out',1000,2000)]))
    print('/////////////////////////////////////////////////')
    inputs = [(constants.MAP,4),(constants.REDUCE,100)]
    print(DfsFlags.create_reduce_flags_stub(3, inputs, 3))
    print('/////////////////////////////////////////////////')
    ret = dfl.get_available_map_tasks(job_id)
    while ret == None:
        print('WAIT LOOP 1 - Getting available map tasks')
        time.sleep(0.1)
        ret = dfl.get_available_map_tasks(job_id)
    
    if len(ret) > 0:
        ret = False
        
    while not ret:
        print('Choosing map tasks')
        map_tasks, done = dfl.choose_map_tasks(2, job_id)
        while map_tasks == None:
            print('WAIT LOOP 2 - Choosing map tasks')
            time.sleep(0.1)
            map_tasks, done = dfl.choose_map_tasks(2, job_id)
        
        print('Assigned map tasks:', map_tasks)
        print('Map tasks complete:', done)
        
        while len(map_tasks) > 0:
            print('--- Wait time - Computing ---')
            time.sleep(int(random.random() * 15))
        
            no_map_task_assigned = len(map_tasks)
        
            to_pick = int(random.random() * no_map_task_assigned) + 1
            
            # choose the ones to complete
            to_complete = []
            for i in range(to_pick):
                map_task = random.choice(map_tasks)
                to_complete.append(map_task[:2])
                map_tasks.remove(map_task)
            
            print('Completing:', to_complete)
            ret = dfl.mark_maps_complete(to_complete, job_id)
            while ret == None:
                print('WAIT LOOP 3 - Marking maps complete')
                time.sleep(0.1)
                ret = dfl.mark_maps_complete(to_complete, job_id)
    
    print('Checking if no map tasks get chosen')
    map_tasks, done = dfl.choose_map_tasks(2, job_id)
    while map_tasks == None:
        print('WAIT LOOP 2 - Choosing map tasks')
        time.sleep(0.1)
        map_tasks, done = dfl.choose_map_tasks(2, job_id)
    
    print('Assigned map tasks:', map_tasks)
    print('Map tasks complete:', done)
    ############# BRICK WALL ###############
    print('-- All maps finished --')
    
    ret = dfl.get_available_reduce_tasks(job_id)
    while ret == None:
        print('WAIT LOOP 5 - Getting available reduce tasks')
        time.sleep(0.1)
        ret = dfl.get_available_reduce_tasks(job_id)
        
    while len(ret) > 0:
        print('Choosing reduce tasks')
        reduce_tasks, reduce_tasks_complete, job_done = dfl.choose_reduce_tasks(2, job_id)
        while reduce_tasks == None:
            print('WAIT LOOP 6 - Choosing reduce tasks')
            time.sleep(0.1)
            reduce_tasks, reduce_tasks_complete, job_done = dfl.choose_reduce_tasks(2, job_id)
        
        print('Reduce tasks complete:', reduce_tasks_complete, ' - job complete:', job_done)
        print('Assigned reduce tasks:',  reduce_tasks)
        
        while len(reduce_tasks) > 0: 
            print('--- Wait time - Computing ---')
            time.sleep(int(random.random() * 15))
        
            no_reduce_task_assigned = len(reduce_tasks)
        
            to_pick = int(random.random() * no_reduce_task_assigned) + 1
            
            # choose the ones to complete
            to_complete = []
            for i in range(to_pick):
                reduce_task = random.choice(reduce_tasks)
                to_complete.append(reduce_task[:3])
                reduce_tasks.remove(reduce_task)
            
            print('Completing:', to_complete)
            ret = dfl.mark_reduces_complete(to_complete, job_id)
            while ret == None:
                print('WAIT LOOP 7 - Marking reduces complete')
                time.sleep(0.1)
                ret = dfl.mark_reduces_complete(to_complete, job_id)
            
        ret = dfl.get_available_reduce_tasks(job_id)
        while ret == None:
            print('WAIT LOOP 8 - Getting available reduce tasks')
            time.sleep(0.1)
            ret = dfl.get_available_reduce_tasks(job_id)
    
    print('Choosing reduce tasks')
    reduce_tasks, reduce_tasks_complete, job_done = dfl.choose_reduce_tasks(2, job_id)
    while reduce_tasks == None:
        print('WAIT LOOP 6 - Choosing reduce tasks')
        time.sleep(0.1)
        reduce_tasks, reduce_tasks_complete, job_done = dfl.choose_reduce_tasks(2, job_id)
        
    print('Reduce tasks complete:', reduce_tasks_complete, ' - job complete:', job_done)
    print('Done all reduce tasks')
    print('/////////////////////////////////////////////////')
    print('DONE JOB!!!')

